package com.starmark.message.queue.consumer.rabbitmq.service.impl;

import com.starmark.message.queue.consumer.api.service.IMessageQueueConsumerService;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 消息队列服务端的监听
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ConfigurableApplicationContext applicationContext;
    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    private final ConnectionFactory connectionFactory;

    @Autowired
    public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;

    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {

            this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setConsumerStartTimeout(6000L);
        ;
            //设置监听的队列名，
            String[] types = {messageQueueConsumerService.topic()};
            container.setQueueNames(types);
            container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
            container.start();
        });

    }


    private void registerBean(String name, Object... args) {
        if (applicationContext.containsBean(name)) {
            return;
        }
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
        if (args.length > 0) {
            for (Object arg : args) {
                beanDefinitionBuilder.addConstructorArgValue(arg);
            }
        }
        BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();

        BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
        beanFactory.registerBeanDefinition(name, beanDefinition);

    }
}
